先前我們在 DAY 06 說明了兩種基本的 materializations: view 和 table。
今天要來介紹另一個 materialization: incremental。
又是一件非常複雜的事情,但除非專案規模很小,不然大多數都會需要用到,中度以上的使用者都需要了解。
View 和 Table 兩種 materializations 各有不同特性:
在 Teamson 我們使用 dbt 建議的三層架構,在 staging 和 intermediate 通常會使用 view,而在 marts 會開始考慮把一些 model 跑成 table。兩個主要理由:
問題來了:把這些資料全都跑成 table 要花很多時間,例如每個小時的定期更新,跑超過 30 分鐘才完成,有什麼辦法改善呢?
這就是我們接下來要介紹的: Incremental Materialization
Incremental refresh 就是每次更新的時候,如果不希望整張表都更新,可以只更新差份。
接下來我要舉一個虛構的例子說明如何定義一個 incremental model。
我們的訂單的資料流如下:
stg_sales_orders -> int_sales_orders -> sales_orders
其中 stg_sales_orders 和 int_sales_orders 我們跑成 view,而 sales_orders 要跑成 table。
排程為每小時一次,資料量為百萬以上。
用欄位 updated_at 來判斷哪些資料有異動(包含新增)。
為了節省時間以及運算資源,我們要用 incremental materialization,只更新過去一個小時新增或異動的資料,而非跑整張表,一般的 table materialization。
和 view/table 類似,我們要在 sales_orders.sql 加上 config -> materialized。
{{ config(materialized='incremental') }}
select * from {{ ref('int_sales_orders') }}
除此之外,我們還要定義哪些資料要更新,我們希望只更新上次更新過後,有異動的資料(包含新增)。
{{ config(materialized='incremental') }}
select * from {{ ref('int_sales_orders') }}
{% if is_incremental() %}
where updated_at >= (select max(updated_at) from {{ this }})
{% endif %}
上面這段語法中,我們加上 if is_incremental()
的判斷,如果要跑 incremental 時,我們只篩選出 updated_at >= (select max(updated_at) from {{ this }})
的資料。
{{this}}
就是目標 table 本身,先抓上一次更新時,最新一筆的更新日期到什麼時候。
假設上一次更新時,資料到 updated_at = 10/16 08:30:15
,那我們就更新 updated_at >= 10/16 08:30:15
的資料。
請注意這裡我們下的是 >=
,想像 10/16 08:30:15 不只有一筆訂單,部份包含在前一次更新,部份不包含。如果下 >
的話,就會有資料漏掉。
那麼,下了 >=
不就會有資料重複的問題嗎?
對的,所以我們需要再加上 unique_key 的參數。這樣就會依 order_id 判斷,資料不重複 insert。
{{ config(materialized='incremental', unique_key='order_id') }}
select * from {{ ref('int_sales_orders') }}
{% if is_incremental() %}
where updated_at >= (select max(updated_at) from {{ this }})
{% endif %}
這樣就完成基本的 incremental materialization 了。
並不是加上了 materialized='incremental'
每一次更新的時候就只挑最近一小時的資料,這麼簡單。
上述的語法,會用 is_incremental()
判斷,true 的時候才會用到 updated_at 的條件。
考慮以下情境
dbt run
或 dbt build
時,加上 --full-refresh
時,就需要強制更新全表。因此,會觸發 is_incremental()
= true,需要同時滿足以下三個條件
--full-refresh
因為 incremental 不把整個 table 重建,而是每次更新的時候,把資料一次一次往上疊。如果 model 定義改變,欄位與先前不同,要如何處理呢?這時候會用到的是 on_schema_change 參數。
有以下幾種模式:
如果沒有指定的話,預設是 ignore。如果需要調整的話,就加入 config。
舉例,把 on_schema_change 設為 fail:
{{ config(materialized='incremental', unique_key='order_id', on_schema_change='fail') }}
select * from {{ ref('int_sales_orders') }}
{% if is_incremental() %}
where updated_at >= (select max(updated_at) from {{ this }})
{% endif %}
如果不想每個 model 都設一次,同樣可以從 dbt_project.yml 一次加。
我的習慣是,在正式環境上版時,一定會跑 full refresh,所以不管選哪一個都沒有差。
但在開發時,如果沒有搞懂 on_schema_change 如何運作,就會無限鬼打牆。
只要有用到 incremental,建議一定要了解 on_schema_change。
剛開始用 Incremental 的時候,incremental 跟 full refresh,時間完全一樣,一點都沒有變快。
我很確定我用錯了!後來花很多力氣,才有明顯改善。我做了這些調整:
我了解資料庫效能優化需要善用 index,但是我一開始用 SAP 原始資料的更新時間,日期和時間是拆開的兩個欄位。我先把兩個欄位合在一起變成 update_timestamp 再進行判斷。改了好幾次都還是超慢。
我們的資料是用 Stitch 從來源 replicate 到我們的 PostgreSQL 報表資料庫,所以後來我發現,可以用 Stitch 為了資料複寫所加上去的欄位。
找到了單一欄位可以使用,事情就變得簡單很多。
找到了單一欄位可以使用,我就能在資料源,用於判斷 update_at 的欄位,設定 index。
最後,由 dbt 產出的目標 table 我也有設定 index,希望判斷 unique key 的時候不會花太久時間。
如果有兩個 incremental model: table_1, table_2,下游有個 table_3 同時用到那兩個 table,也希望設計為 incremental model,該怎麼做呢?
我的方法是,table_3 同樣定義一個欄位 updated_at 為 greatest(table_1.updated_at, table.updated_at)
,is_incremental 的判斷就放
{% if is_incremental() %}
where
greatest(table_1.updated_at, table_2.updated_at)
>= (select max(updated_at) from {{ this }})
{% endif %}
如果來源的資料,有資料被刪除的話,也需要納入考量。
我沒看過相關的討論,如果有的話,歡迎留言給我。
dbt: The key to Scaling Your Data Team with Grace
https://www.youtube.com/watch?v=iBNSEbKnFBI&t=7275s (大約為 2:00:00 - 2:30:00)
這是 Jolanda 先前在 dbt Taipei Meetup 分享的錄影,是我印象特別深刻的一場。
全程英文,但她用很生動易懂的例子,說明了她在 Wise 使用及優化 incremental strategy 的心路歷程。
當時我也深受更新時間所苦,聽了特別有感觸。
依據 dbt 官方的建議
- 🔍 Start with a view. When the view gets too long to query for end users,
- ⚒️ Make it a table. When the table gets too long to build in your dbt Jobs,
- 📚 Build it incrementally. That is, layer the data on in chunks as it comes in.
先從 view 開始,如果太慢的話,改用 table。
如果 table 跑的時間太久,再用 incremental。
雖然 incremental 很重要,但是學問也很深。
如果還沒有實際開始導入的朋友們,讀到這邊可能會覺得很難理解,建議可以照順序一步一步導入之後,等需要用到 incremental 的時候再來細讀。
明天的主題:Exposures, Tags, Targets
歡迎加入 dbt community
對 dbt 或 data 有興趣 👋?歡迎加入 dbt community 到 #local-taipei 找我們,也有實體 Meetup 請到 dbt Taipei Meetup 報名參加